/****************************************************************************
 * Copyright (c)2010 REMAIN B.V. The Netherlands. (http://www.remainsoftware.com).
 *
 * This program and the accompanying materials are made
 * available under the terms of the Eclipse Public License 2.0
 * which is available at https://www.eclipse.org/legal/epl-2.0/
 *
 *  Contributors:
 *     Wim Jongman - initial API and implementation 
 *     Ahmed Aadel - initial API and implementation     
 *
 * SPDX-License-Identifier: EPL-2.0
 *****************************************************************************/

package org.eclipse.ecf.provider.zookeeper.node.internal;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.eclipse.core.runtime.Assert;
import org.eclipse.ecf.provider.zookeeper.core.ZooDiscoveryContainer;
import org.eclipse.ecf.provider.zookeeper.util.Logger;
import org.eclipse.ecf.provider.zookeeper.util.PrettyPrinter;
import org.osgi.service.log.LogService;

class WriteRoot implements Watcher {
	private ZooKeeper writeKeeper;
	private String ip;
	private WatchManager watchManager;
	private boolean isConnected;
	private Object connectionLock = new Object();

	WriteRoot(String ip, WatchManager watchManager) {
		Assert.isNotNull(ip);
		Assert.isNotNull(watchManager);
		this.ip = ip;
		this.watchManager = watchManager;
		initWriteKeeper();
	}

	@SuppressWarnings({ "incomplete-switch" })
	public void process(final WatchedEvent event) {
		ZooDiscoveryContainer.CACHED_THREAD_POOL.execute(new Runnable() {
			public void run() {
				synchronized (connectionLock) {
					switch (event.getState()) {
					case Disconnected:
						isConnected = false;
						watchManager.unpublishAll();
						connect();
						break;
					case Expired:
						isConnected = false;
						watchManager.unpublishAll();
						connect();
						break;
					case SyncConnected:
						if (!isConnected) {
							isConnected = true;
							watchManager.addZooKeeper(writeKeeper);
							watchManager.republishAll();
						}
						break;
					// ignore @deprecated cases
					}
				}
			}
		});
	}

	private void connect() {
		synchronized (connectionLock) {
			if (this.isConnected || watchManager.isDisposed()) {
				return;
			}
			try {
				if (writeKeeper != null) {
					writeKeeper.close();
					watchManager.removeZooKeeper(writeKeeper);
					writeKeeper = null;
				}
				writeKeeper = new ZooKeeper(this.ip, 3000, this);

			} catch (Exception e) {
				Logger.log(LogService.LOG_DEBUG, e.getMessage(), e);
			}
		}
	}

	private void initWriteKeeper() {
		try {
			if (watchManager.getConfig().isQuorum()
					|| watchManager.getConfig().isStandAlone()) {
				// we write nodes locally but we should check for client port.
				int port = watchManager.getConfig().getClientPort();
				if (port != 0)
					ip += ":" + port;//$NON-NLS-1$
			} else if (watchManager.getConfig().isCentralized()) {
				// we write nodes to the machine with this specified IP address.
				ip = watchManager.getConfig().getServerIps();
			}
			try {
				writeKeeper = new ZooKeeper(this.ip, 3000, this);
			} catch (Exception e) {
				// FATAL
				Logger.log(LogService.LOG_ERROR,
						"Fatal error while initializing a zookeeper client to write to: "
								+ ip, e);
				// halt here before the NPE's get out of house in
				// Publisher.publish()
				throw new IllegalStateException(e);
			}
			while (!this.isConnected) {
				synchronized (connectionLock) {
					if (watchManager.isDisposed()) {
						// no need for connecting, we're disposed.
						try {
							writeKeeper.close();
						} catch (Throwable t) {
							// ignore
						}
						break;
					}
					try {
						Stat s = this.writeKeeper.exists(INode.ROOT, this);
						this.isConnected = true;
						if (s == null) {
							writeKeeper.create(INode.ROOT, new byte[0],
									Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
						}

					} catch (KeeperException e) {
						if (e.code()
								.equals(KeeperException.Code.CONNECTIONLOSS)) {
							isConnected = false;
							PrettyPrinter.attemptingConnectionTo(this.ip);
						} else
							Logger.log(
									LogService.LOG_ERROR,
									"Error while trying to connect to " + this.ip, e); //$NON-NLS-1$
					}
				}
			}
			synchronized (this) {
				this.notifyAll();
			}

		} catch (Exception e) {
			Logger.log(LogService.LOG_DEBUG, e.getMessage(), e);
		}
	}

	public ZooKeeper getWriteKeeper() {
		return writeKeeper;
	}

	public boolean isConnected() {
		return isConnected;
	}

	public WatchManager getWatchManager() {
		return watchManager;
	}
}